package com.shujia.flink.core

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala._

import java.util.Properties

object Demo17ExactlyOnce {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //开启checkpoint

    // 每 1000ms 开始一次 checkpoint
    env.enableCheckpointing(20000)
    // 高级选项：
    // 设置模式为精确一次 (这是默认值)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // 确认 checkpoints 之间的时间会进行 500 ms
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    // Checkpoint 必须在一分钟内完成，否则就会被抛弃
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    // 允许两个连续的 checkpoint 错误
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)

    // 同一时间只允许一个 checkpoint 进行
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    // 使用 externalized checkpoints，这样 checkpoint 在作业取消后仍就会被保留
    //RETAIN_ON_CANCELLATION: 当任务取消时保留checkpoint
    env.getCheckpointConfig.setExternalizedCheckpointCleanup(
      ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    /**
     * 需要设置flink checkpoint保存状态的位置
     *
     */

    env.setStateBackend(new HashMapStateBackend())
    //将状态保存到hdfs中
    env.getCheckpointConfig.setCheckpointStorage("hdfs://master:9000/flink/checkpoint")


    val source: KafkaSource[String] = KafkaSource.builder[String]
      .setBootstrapServers("master:9092,node1:9092,node2:9092")
      .setTopics("source")
      .setGroupId("Demo16ExactlyOnce")
      .setStartingOffsets(OffsetsInitializer.earliest) //只在第一次启动的时候生效，如果开启了checkpoint,任务重启之后会按照checkpoint中保证的偏移量消费数据
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build

    val kafkaSource: DataStream[String] = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")

    //过滤空数据
    val filterDS: DataStream[String] = kafkaSource.filter(_.nonEmpty)

    /**
     * DeliveryGuarantee.AT_LEAST_ONCE：至少异常，会有重复数据
     * DeliveryGuarantee.EXACTLY_ONCE： 唯一一次
     *
     */

    //将清洗之后的数据保存到kafka中
    val properties = new Properties()
    //设置事务的超时时间，要比15分钟小
    properties.setProperty("transaction.timeout.ms", 10 * 60 * 1000 + "")
    val kafkaSink: KafkaSink[String] = KafkaSink
      .builder[String]()
      .setBootstrapServers("master:9092,node1:9092,node2:9092") //broker地址
      .setKafkaProducerConfig(properties) //设置额外的参数
      .setRecordSerializer(
        KafkaRecordSerializationSchema
          .builder[String]()
          .setTopic("sink") //topic
          .setValueSerializationSchema(new SimpleStringSchema())
          .build())
      .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
      .build()


    filterDS.sinkTo(kafkaSink)

    /**
     * 通过命令好消费sink数据
     * --isolation-level read_committed : 只读已提交的数据
     * kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --isolation-level read_committed --from-beginning --topic sink
     *
     */


    env.execute()
  }

}
